epoll的内核实现

您所在的位置:网站首页 linux内核 windows内核 epoll的内核实现

epoll的内核实现

#epoll的内核实现| 来源: 网络整理| 查看: 265

select/poll的缺点

A. 每次调用时重复的从用户态读入参数

B. 每次调用时全量的扫描文件描述符

C. 每次调用开始,将进程加入到每个文件描述符的等待队列,在调用结束后又把进程从等待队列中删除。

D. 在不修改内核的情况下,select最多支持1024个文件描述符。

文件系统中的一些重要结构

在linux中,进程通过file_struct结构与文件关联,而文件通过等待队列与进程关联,进而形成一种多对多的关系。

首先,文件对象struct file_struct

struct files_struct { atomic_t count; //自动增量 struct fdtable *fdt; struct fdtable fdtab; fd_set close_on_exec_init; //执行exec时需要关闭的文件描述符集合 fd_set open_fds_init; //当前打开文件的文件描述符屏蔽字 struct file *fd_array[NR_OPEN_DEFAULT];//文件对象数组 spinlock_t file_lock; };

该结构在进程的task_struct中使用,用于保存进程当前打开的文件集合。其中struct file称为文件对象,保存文件的信息,这里我们仅列出我们可能会用到的成员

struct file{ .... struct file_operations *f_op; ... }

在struct file_operations对象中存储对文件对象可以进行各种操作的指针:

struct file_operations { struct module *owner; loff_t(*llseek) (struct file *, loff_t, int); ssize_t(*read) (struct file *, char __user *, size_t, loff_t *); ssize_t(*aio_read) (struct kiocb *, char __user *, size_t, loff_t); ssize_t(*write) (struct file *, const char __user *, size_t, loff_t *); ssize_t(*aio_write) (struct kiocb *, const char __user *, size_t, loff_t); int (*readdir) (struct file *, void *, filldir_t); unsigned int (*poll) (struct file *, struct poll_table_struct *); int (*ioctl) (struct inode *, struct file *, unsigned int, unsigned long); int (*mmap) (struct file *, struct vm_area_struct *); int (*open) (struct inode *, struct file *); int (*flush) (struct file *); int (*release) (struct inode *, struct file *); int (*fsync) (struct file *, struct dentry *, int datasync); int (*aio_fsync) (struct kiocb *, int datasync); int (*fasync) (int, struct file *, int); int (*lock) (struct file *, int, struct file_lock *); ssize_t(*readv) (struct file *, const struct iovec *, unsigned long, loff_t *); ssize_t(*writev) (struct file *, const struct iovec *, unsigned long, loff_t *); ssize_t(*sendfile) (struct file *, loff_t *, size_t, read_actor_t, void __user *); ssize_t(*sendpage) (struct file *, struct page *, int, size_t, loff_t *, int); unsigned long (*get_unmapped_area) (struct file *, unsigned long, unsigned long, unsigned long, unsigned long); };

epoll模型

epoll自己保存传入的文件描述符,同时通过设备等待队列唤醒时调用“回调函数”实现事件的通知。

epoll模型将select/poll单个的操作拆分:

int epoll_create(int size); int epoll_ctl(int epfd, int op, int fd ,struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

epoll机制实现了自己特有的文件系统eventpoll filesystem。

epoll_create闯将一个属于该文件系统的文件,并返回其文件描述符。struct eventpoll 保存了epoll文件节点的扩展信息,该结构保存在private_data域中,每个由epoll_create得到的文件描述符都分配了一个该结构体,让我们来看一下struct eventpll的内部结构:

struct eventpoll { /* 用于维护自身的状态,可用于中断上下文 */ spinlock_t lock; /* * 用户进程上下文中 */ struct mutex mtx; /* 进程等待队列,由 sys_epoll_wait()使用,调用epoll_wait时,休眠在这里 */ wait_queue_head_t wq; /* 进程等待队列,由 file->poll()使用 ,epollfd本身被poll时,休眠在这里*/ wait_queue_head_t poll_wait; /* 就绪文件描述符链表 */ struct list_head rdllist; /* 红黑树头节点,该红黑树用于存储要监控的文件描述符 */ struct rb_root rbr; /* * ready事件的临时存放链表 */ struct epitem *ovflist; /* 创建eventpoll descriptor的用户 */ struct user_struct *user; };

epoll_ctl 接口加入该epoll描述符监听的套接字属于socket filesystem,每一个都对应一个epitem结构体,该结构以红黑树的方式存储,eventpoll中的rbr成员指向该红黑树的root节点,而有监听事件到来的套接字结构以双向连表的形式保存,其头结点对应eventpoll中的rdllist成员。

struct epitem { /*红黑树节点 */ struct rb_node rbn; /*就绪描述符链表节点 */ struct list_head rdllink; /* * Works together "struct eventpoll"->ovflist in keeping the * single linked chain of items. */ struct epitem *next; /* 本结构对应的文件描述符信息 */ struct epoll_filefd ffd; /* Number of active wait queue attached to poll operations */ int nwait; /* List containing poll wait queues */ struct list_head pwqlist; /* The "container" of this item */ struct eventpoll *ep; /* List header used to link this item to the "struct file" items list */ struct list_head fllink; /* The structure that describe the interested events and the source fd */ struct epoll_event event; };

上述结构中fllink是指向文件系统链表的立案表头,struct file 称为文件结构,一般代表一个打开的文件描述符。

而epoll_filefd结构则表明了epitem对应的文件描述符信息:

struct epoll_filefd { struct file *file; //文件结构指针 int fd;}; //对应的文件描述符

struct epoll_event event则表明了感兴趣的事件和原始的fd:

struct epoll_event { uint32_t events; /*想要监听的事件 */ epoll_data_t data; /* 用户数据变量,可以用来存放一些用户自定义的信息 */ } __attribute__ ((__packed__)); typedef union epoll_data { void *ptr; //指向自定义数据 int fd; uint32_t u32; uint64_t u64; } epoll_data_t;

看完上面两个结构,让我们来看一下epoll_ctl的真身:

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd, struct epoll_event __user *, event) { int error; struct file *file, *tfile; struct eventpoll *ep; struct epitem *epi; struct epoll_event epds; DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_ctl(%d, %d, %d, %p)/n", current, epfd, op, fd, event)); error = -EFAULT; if (ep_op_has_event(op) && copy_from_user(&epds, event, sizeof(struct epoll_event))) goto error_return; /* Get the "struct file *" for the eventpoll file */ error = -EBADF; file = fget(epfd); if (!file) goto error_return; /* Get the "struct file *" for the target file */ tfile = fget(fd); if (!tfile) goto error_fput; /* The target file descriptor must support poll */ error = -EPERM; if (!tfile->f_op || !tfile->f_op->poll) goto error_tgt_fput; /* * We have to check that the file structure underneath the file descriptor * the user passed to us _is_ an eventpoll file. And also we do not permit * adding an epoll file descriptor inside itself. */ error = -EINVAL; if (file == tfile || !is_file_epoll(file)) goto error_tgt_fput; /* * At this point it is safe to assume that the "private_data" contains * our own data structure. */ ep = file->private_data; mutex_lock(&ep->mtx); /* * Try to lookup the file inside our RB tree, Since we grabbed "mtx" * above, we can be sure to be able to use the item looked up by * ep_find() till we release the mutex. */ epi = ep_find(ep, tfile, fd); error = -EINVAL; switch (op) { case EPOLL_CTL_ADD: if (!epi) { epds.events |= POLLERR | POLLHUP; error = ep_insert(ep, &epds, tfile, fd); } else error = -EEXIST; break; case EPOLL_CTL_DEL: if (epi) error = ep_remove(ep, epi); else error = -ENOENT; break; case EPOLL_CTL_MOD: if (epi) { epds.events |= POLLERR | POLLHUP; error = ep_modify(ep, epi, &epds); } else error = -ENOENT; break; } mutex_unlock(&ep->mtx); error_tgt_fput: fput(tfile); error_fput: fput(file); error_return: DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_ctl(%d, %d, %d, %p) = %d/n", current, epfd, op, fd, event, error)); return error; }

代码的逻辑很简单,注释也很清楚。上面我们已经介绍了文件对象结构,因此我们这里主要关注剩下的流程,以添加一个监听事件为例:

static int ep_insert(struct eventpoll *ep, struct epoll_event *event,struct file *tfile, int fd) { int error, revents, pwake = 0; unsigned long flags; struct epitem *epi; struct ep_pqueue epq; /* 不允许超过最大监听个数,每个用户均有一个监听上限*/ if (unlikely(atomic_read(&ep->user->epoll_watches) >= max_user_watches)) return -ENOSPC; if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) return -ENOMEM; /* Item initialization follow here ... */ INIT_LIST_HEAD(&epi->rdllink); INIT_LIST_HEAD(&epi->fllink); INIT_LIST_HEAD(&epi->pwqlist); epi->ep = ep; ep_set_ffd(&epi->ffd, tfile, fd); epi->event = *event; epi->nwait = 0; epi->next = EP_UNACTIVE_PTR; /* Initialize the poll table using the queue callback */ epq.epi = epi; init_poll_funcptr(&epq.pt, ep_ptable_queue_proc); //注册事件响应的回调函数 /* * Attach the item to the poll hooks and get current event bits. * We can safely use the file* here because its usage count has * been increased by the caller of this function. Note that after * this operation completes, the poll callback can start hitting * the new item. */ revents = tfile->f_op->poll(tfile, &epq.pt); //执行poll函数,对于socket来说,执行tcp_poll函数 /* * We have to check if something went wrong during the poll wait queue * install process. Namely an allocation for a wait queue failed due * high memory pressure. */ error = -ENOMEM; if (epi->nwait < 0) goto error_unregister; /* Add the current item to the list of active epoll hook for this file */ spin_lock(&tfile->f_ep_lock); list_add_tail(&epi->fllink, &tfile->f_ep_links); spin_unlock(&tfile->f_ep_lock); /* * Add the current item to the RB tree. All RB tree operations are * protected by "mtx", and ep_insert() is called with "mtx" held. */ ep_rbtree_insert(ep, epi); /* We have to drop the new item inside our item list to keep track of it */ spin_lock_irqsave(&ep->lock, flags); /* If the file is already "ready" we drop it inside the ready list */ if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) { list_add_tail(&epi->rdllink, &ep->rdllist); /* Notify waiting tasks that events are available */ if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; } spin_unlock_irqrestore(&ep->lock, flags); atomic_inc(&ep->user->epoll_watches); /* We have to call this outside the lock */ if (pwake) ep_poll_safewake(&psw, &ep->poll_wait); DNPRINTK(3, (KERN_INFO "[%p] eventpoll: ep_insert(%p, %p, %d)/n", current, ep, tfile, fd)); return 0; error_unregister: ep_unregister_pollwait(ep, epi); /* * We need to do this because an event could have been arrived on some * allocated wait queue. Note that we don't care about the ep->ovflist * list, since that is used/cleaned only inside a section bound by "mtx". * And ep_insert() is called with "mtx" held. */ spin_lock_irqsave(&ep->lock, flags); if (ep_is_linked(&epi->rdllink)) list_del_init(&epi->rdllink); spin_unlock_irqrestore(&ep->lock, flags); kmem_cache_free(epi_cache, epi); return error; }

本函数将epitem对象添加到eventpoll中,实际上是将用户的输入保存到eventpoll文件系统中。init_poll_funcptr函数将ep_ptable_queue_proc函数注册到poll table中,然后程序的下一步是调用tfile的poll函数,并且函数的第二个参数为poll table。ep_ptable_queue_proc函数的主要作用是注册等待函数,并添加到指定的等待队列,第一次调用后此信息已经存在,无需在poll函数中在此调用了。

static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,poll_table *pt) { struct epitem *epi = ep_item_from_epqueue(pt); struct eppoll_entry *pwq; if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) { /* 为监听套接字注册一个等待回调函数,在唤醒时调用*/ init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); pwq->whead = whead; pwq->base = epi; add_wait_queue(whead, &pwq->wait); list_add_tail(&pwq->llink, &epi->pwqlist); epi->nwait++; } else { /* We have to signal that an error occurred */ epi->nwait = -1; } }

实际上,如果我们仅是使用epoll则poll函数到底是什么我们是不需要关系的,严格来说,具体poll函数是怎么样的要看通过epoll_ctl传入的套接字类型。对于tcp套接字来说可以按照创建流程找到其对应的函数为tcp_poll, 该函数将进程注册到sock成员的sk_sleep等待队列中,在对应的IO事件中该队列被唤醒,进而调用相应的等待回调函数。

让我们来看一**册的ep_poll_callback:

static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key) { int pwake = 0; unsigned long flags; struct epitem *epi = ep_item_from_wait(wait); struct eventpoll *ep = epi->ep; DNPRINTK(3, (KERN_INFO "[%p] eventpoll: poll_callback(%p) epi=%p ep=%p/n", current, epi->ffd.file, epi, ep)); /* 对eventpoll的spinlock加锁,因为是在中断上下文中*/ spin_lock_irqsave(&ep->lock, flags); /* 没有事件到来 * If the event mask does not contain any poll(2) event, we consider the * descriptor to be disabled. This condition is likely the effect of the * EPOLLONESHOT bit that disables the descriptor when an event is received, * until the next EPOLL_CTL_MOD will be issued. */ if (!(epi->event.events & ~EP_PRIVATE_BITS)) goto out_unlock; /* * If we are trasfering events to userspace, we can hold no locks * (because we're accessing user memory, and because of linux f_op->poll() * semantics). All the events that happens during that period of time are * chained in ep->ovflist and requeued later on. */ if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) { if (epi->next == EP_UNACTIVE_PTR) { epi->next = ep->ovflist; ep->ovflist = epi; } goto out_unlock; } /* If this file is already in the ready list we exit soon */ if (ep_is_linked(&epi->rdllink)) goto is_linked; /* 加入ready queue*/ list_add_tail(&epi->rdllink, &ep->rdllist); is_linked: /* * Wake up ( if active ) both the eventpoll wait list and the ->poll() * wait list. */ if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; out_unlock: spin_unlock_irqrestore(&ep->lock, flags); /* We have to call this outside the lock */ if (pwake) ep_poll_safewake(&psw, &ep->poll_wait); return 1; }

这里有两个队列,一个是epoll_wait调用中使用的eventpoll等待队列,用于判断是否有监听套接字可用;另一个对应于每个套接字的等待队列sk_sleep,用于判断每个监听套接字上的时间,该队列唤醒后调用ep_poll_callback,在该函数中又调用wakeup函数来唤醒前一种队列,来通知epoll_wait调用进程。

epoll_wait中调用下面的等待函数,一旦其被ep_poll_callback唤醒,则调用ep_send_events把事件复制到用户控件,进而epoll_wait返回。

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout) { int res, eavail; unsigned long flags; long jtimeout; wait_queue_t wait; /* * Calculate the timeout by checking for the "infinite" value ( -1 ) * and the overflow condition. The passed timeout is in milliseconds, * that why (t * HZ) / 1000. */ jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ? MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000; retry: spin_lock_irqsave(&ep->lock, flags); res = 0; if (list_empty(&ep->rdllist)) { /* * We don't have any available event to return to the caller. * We need to sleep here, and we will be wake up by * ep_poll_callback() when events will become available. */ init_waitqueue_entry(&wait, current); wait.flags |= WQ_FLAG_EXCLUSIVE; __add_wait_queue(&ep->wq, &wait); for (;;) { /* * We don't want to sleep if the ep_poll_callback() sends us * a wakeup in between. That's why we set the task state * to TASK_INTERRUPTIBLE before doing the checks. */ set_current_state(TASK_INTERRUPTIBLE); if (!list_empty(&ep->rdllist) || !jtimeout) break; if (signal_pending(current)) { res = -EINTR; break; } spin_unlock_irqrestore(&ep->lock, flags); jtimeout = schedule_timeout(jtimeout); spin_lock_irqsave(&ep->lock, flags); } __remove_wait_queue(&ep->wq, &wait); set_current_state(TASK_RUNNING); } /* Is it worth to try to dig for events ? */ eavail = !list_empty(&ep->rdllist); spin_unlock_irqrestore(&ep->lock, flags); /* * Try to transfer events to user space. In case we get 0 events and * there's still timeout left over, we go trying again in search of * more luck. */ if (!res && eavail && !(res = ep_send_events(ep, events, maxevents)) && jtimeout) goto retry; return res; }

使用epoll的整个流程可以总结如下(copy来的):

struct sock结构

该结构主要对应于一个socket描述符,通过上面的讲述再根据sock的结构,我们大致可以勾勒出一个事件到来到事件返回的全过程,涉及到Linux网络协议栈的等后面学习了再补充,当前仅贴出该数据结构:

struct sock { struct sock_common __sk_common; #define sk_family __sk_common.skc_family #define sk_state __sk_common.skc_state #define sk_reuse __sk_common.skc_reuse #define sk_bound_dev_if __sk_common.skc_bound_dev_if #define sk_node __sk_common.skc_node #define sk_bind_node __sk_common.skc_bind_node #define sk_refcnt __sk_common.skc_refcnt unsigned char sk_shutdown : 2, sk_no_check : 2, sk_userlocks : 4; unsigned char sk_protocol; unsigned short sk_type; int sk_rcvbuf; socket_lock_t sk_lock; wait_queue_head_t *sk_sleep; struct dst_entry *sk_dst_cache; struct xfrm_policy *sk_policy[2]; rwlock_t sk_dst_lock; atomic_t sk_rmem_alloc; atomic_t sk_wmem_alloc; atomic_t sk_omem_alloc; struct sk_buff_head sk_receive_queue; struct sk_buff_head sk_write_queue; int sk_wmem_queued; int sk_forward_alloc; unsigned int sk_allocation; int sk_sndbuf; int sk_route_caps; int sk_hashent; unsigned long sk_flags; unsigned long sk_lingertime; struct { struct sk_buff *head; struct sk_buff *tail; } sk_backlog; struct sk_buff_head sk_error_queue; struct proto *sk_prot; struct proto *sk_prot_creator; rwlock_t sk_callback_lock; int sk_err, sk_err_soft; unsigned short sk_ack_backlog; unsigned short sk_max_ack_backlog; __u32 sk_priority; struct ucred sk_peercred; int sk_rcvlowat; long sk_rcvtimeo; long sk_sndtimeo; struct sk_filter *sk_filter; void *sk_protinfo; struct timer_list sk_timer; struct timeval sk_stamp; struct socket *sk_socket; void *sk_user_data; struct page *sk_sndmsg_page; struct sk_buff *sk_send_head; __u32 sk_sndmsg_off; int sk_write_pending; void *sk_security; void (*sk_state_change)(struct sock *sk);//状态改变时调用 void (*sk_data_ready)(struct sock *sk, int bytes);//有数据可读时调用,即读事件 void (*sk_write_space)(struct sock *sk);//有数据可写时调用,即写事件 void (*sk_error_report)(struct sock *sk);//套接字错误时调用 int (*sk_backlog_rcv)(struct sock *sk, struct sk_buff *skb); void (*sk_destruct)(struct sock *sk);//套接字被释放时调用。 };

在该结构的最后有几个回调函数,其主要的作用就是在套接字状态发生变化时执行的回调,见名思意。

epoll使用范例

int epoll_create(int size); //size 表示要监听的文件描述符数量 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout); // timeout = -1 阻塞; timeout = 0 立即返回

epoll_event的结构如下:

typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t; struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ };

events可以是一下几个标志的集合:

EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);

EPOLLOUT:表示对应的文件描述符可以写;

EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);

EPOLLERR:表示对应的文件描述符发生错误;

EPOLLHUP:表示对应的文件描述符被挂断;

EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。

EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

而epoll_ctl的op参数可以为一下值:

EPOLL_CTL_ADD:注册新的fd到epfd中;

EPOLL_CTL_MOD:修改已经注册的fd的监听事件;

EPOLL_CTL_DEL:从epfd中删除一个fd;

随手附上一个简单的例子,不提供任何保证:

#define MAX_CONNECTION 1024 struct self_define_data{ int data; }; int main(int argc, char* argv[]){ int listen_fd,client_fd,flag; struct sockaddr_in server,client; struct epoll_event ee,event_list[20]; listen_fd = socket(AF_INET,SOCK_STREAM,0); /* flag = fcntl(listen_fd,F_GETFL,0); flag |= O_NONBLOCK; if(fcntl(listen_fd,F_SETFL,flag) < 0){ perror("set non_block failed"); return -1; } */ ioctl(listen_fd,FIONBIO,&n); bzero(&server,sizeof(struct sockaddr_in)); server.sin_family = AF_INET; inet_aton("127.0.0.1",&server.sin_addr); server.sin_port = htons(80); bind(listen_fd, (struct sockaddr*)&server, sizeof(struct sockaddr)); listen(listen_fd,5); int ep = epoll_create(MAX_CONNECTION); // int cycle->connection if(-1 == ep){ perror("epoll_create failed."); return -1; } struct self_define_data *self; self->data = 10; ee.events = EPOLLIN|EPOLLOUT|EPOLLET; ee.data.ptr = (void*)self; ee.data.fd = listen_fd; epoll_ctl(ep,EPOLL_CTL_ADD,listen_fd, &ee); while(1){ int num = epoll_wait(ep,event_list,20,-1); for(int i = 0; i < num; i++){ struct sef_define_data s = event_list[i].data.ptr; if(10 == s.data){ uint32_t revent = event_list[i].events;//revent中包含返回的事件如EPOLLIN if(event_list[i].data.fd == listen_fd){ client_fd = accept(listen_fd,(struct sockaddr*)&client,sizeof(struct sockaddr)); //do someting if(client_fd < 0) break; close(client_fd); } } } } if(-1 == close(ep)){ perror("epoll_close failed."); return -1; } return 0; }



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3